Enterprise Data Warehouse

Data warehouses transform small and big businesses into digital enterprises: a central datum of supplier, partner, customer, sales, supply-chain, and ops enables enterprises to describe, plan, prescribe, and preempt business plans/outcomes.

One such enterprise -- Exate Six Cages (ESC) -- has ~2PB of enterprise data collected over last 11 years in a Hewlett Packard Neoview data warehouse -- a massively parallel processing (MPP) ACID engine. Neoview is now defunct (it was decommissioned by Hewlett Packard in 2011). ESC finds itself wanting to rebuild its deltalake in the modern cloud than continuing to survive on aging technology and hardware. Many enterprises have similarly invested in traditional on-prem warehouse implementations like Exadata, Netezza, and Teradata: during these covid-19 induced digital-transformation times, they find themselves wanting to modernize to cloud-first deltalake implementations. Underneath, I highlight ESC's accelerated transformation journey from Neoview to Spark in under a month. Hopefully this overview can help guide other enterprises achieve DW to DL transformation quickly.

Bronze, Silver, Gold Zones

The legacy Neoview of the ESC followed the traditional bronze, silver, and gold segmentations of data. It supported a full range of ACID MPP semantics. There are three parallel instances of the ESC Neoview clusters that round-robin between the (L)oad, (T)ransform, and (E)xtract shifts for continuous availability and durability needs --

Size

Loading

Data arrives in 0x1C-delimited-LZ4-compressed-UCS2 text files every 15 minutes. Current legacy database allows for arrival event triggers: but given prior agreement of 4/hr velocity, this trigger functionality is seldom used in the Neoview implementation: instead, cron-based schedules kickstart the ingress pipelines.

Structured Query Language (SQL)

Majority of loading, transformation, and extraction is written in Neoview SQL-c01932297.pdf"): albeit there is a small portion of non-standard dialect (listed below) that calls special attention.

  1. data type definitions like nchar(2)-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1010093") with width, scale, and precision SoW specifications
  2. transactional hints like for read uncommitted access in shared mode-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1010464"),
  3. structural hints like table (disk label statistics tbl)-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1064246")
  4. SQL extensions like year to fraction-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1011631"), and _ucs2'你好'-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1011602")
  5. Non ANSI extensions like transpose (a,x),(b,y),(c,z) as (k,v)-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1004421")
  6. Scheme-on-write specifications like insert into wtbl(col1, col2, col3) select (rcol1, rcol2, rcol3) from rtbl-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1008135")

Scheduling

The transformation and the reporting pipelines use SQL. This SQL codebase was developed over a span of 11 years: since the MPP engine is "always-on" during the transformation shift -- the engine does not need any boot-strap and wind-down overhead/latencies for triggering micro-SQL transformation statements.

Hadoop on the other hand uses yarn negotiator and oozie scheduler to bootstrap resources for the job which can accumulate into unacceptable startup delays unless a plurality of micro-SQL statements are assembled into fewer long-running DAG graphs. Forklifting SQL statements verbatim without re-assembly into bigger DAGs will essentially be a perilous yarn negotiation/queuing challenge.

Enterprise Class

Neoview offers enterprise class fault-tolerance, logging, monitoring, alerting, debugging, security/protection, lineage/provenance, maintenance, visual editing, and auditing support. Any replacement must add to (not subtract from) these enterprise class SLAs at minima.

Summary Requirements

The destinant deltalakehouse must -- in sum -- satisfy at least the following requirements --

  1. Provide schema-on-read flexibility (schema plasticity on ingress from heterogenous sources). Relational, non-relational, hierarchic, temporal, binary payloads must find multi-tenancy in the data lake.
  2. Provide schema-on-write ACID consistency; it must enable transactional CRUD semantics at block-level and cell-level with equal ease.
  3. Provide high-speed SQL ETL -- the full range of SQL92 compliance is preferred.
  4. Provide a lambda engine that allows both streaming and batch source integration.
  5. Provide enterprise grade data delineation and protection standards.
  6. Provide optimized storage, compression, and encoding formats for predicate pushdown.
  7. Provide for ad hoc, scheduled, and interactive drilldown via both JDBC and ODBC layers of integration.
  8. Provide in-situ advanced analytics.
  9. Provide enterprise class logging, monitoring, debugging, dataops requirements.

Delta Implementation

Procuring on-prem iron for deltalake is a challenge during COVID times: I recommend using the docker-compose (running on localhost) to avoid slowing the development: in fact, I recommend this build for quick dataops irrespective of the procurement challenges as well. The selected serverless/storageless services in the composition allows for future-ready cloud-first deltalake implementation: we can rapidly deploy deltaops to any cloud when the development is complete. We deep dive into the composition of the big data below.


Welcome to Self-Service Analytics Docker Appliance (SSAP)

Data is king: its your data, have it your way! As you ingest, transform, curate, summarize, and consume your data, you will need handy tools and capabilities to develop your analytics. This instance of the docker compose is a "private sandbox" appliance aimed to help you quickly spin up core enterprise analytics tools on your own host: this setup will bootstrap analytical development on an accelerated basis which can ultimately be graduated into the production instance.

Architectural Components

The SSAP is composed of Hadoop: a scalable, distributed big data platform and rest of its supporting ecosystem: like -

Where these components can be independently installed by developers, I package a few of these applications (and their associated configurations together) here into a single service composition so developers can focus on ELT, data management, refinery, consumption, analytics, and data science functions than platforming. Additionally, since majority of the ingress sources into SSAP are from Kafka (streaming) and files (batch), I prepackage notebook templates for managing simple integrations. I also demonstrate JDBC/ODBC/extract egress paths as reporting, dashboarding, extraction guides.

Installation

To install, download and extract the zipped package to a location on your laptop (or another docker host with at least 16GB RAM). Ensure docker-compose is installed on the host. If you are running this on a Windows host, download and install Docker Desktop for Windows. On Windows, WSL2 is preferred. See instructions at WSL2 Installation Guide. Also ensure that docker uses WSL2 integration for better performance.

Enable WSL2 in Windows

Subsequently, start (select) services using --

docker-compose up --build [servicename] (spark|kafka|cdap|airflow|presto|<blank>)

Optional: Kompose

If you would rather run SSAP instance in the cloud, install kompose and convert the docker-compose.yaml file into kubernetes (k8s) deployment descriptions --

  1. Convert compose files into k8s yaml files using -- kompose convert docker-compose.yml
  2. A multitude of yaml files (one for each service) are created from that command. Assuming all the yaml files are available in the folder <kube>, instantly spin up the pods using -- kubectl apply --validate=false -f kube
  3. List of all the services and deployments can be listed as -- kubectl get services
  4. If you are using microk8s instead of kubernetes (in the cloud), you have to follow instructions and prefix kubectl with microk8s for these instructions to work.

Component Services

In the compose file, you will notice the following services --

Service Hostname URL Description Credentials
Spark spark http://<dockerhost>:8888/lab The dominant service that hosts the JupyterLab, Hive Metastore, and Spark ETL services. Developers must use this service primarily.
Beeline spark jdbc:hive2://<dockerhost>:10000/default;auth=noSasl Primary thrift/JDBC interface for running Spark/Hive DML/DDL queries. admin/admin
Minio minio http://<dockerhost>:9000/minio Object storage for hosting deltalakehouse models. Both relational and object models are saved to Minio (instead of HDFS). accesskey/secretkey
CDAP cdap http://<dockerhost>:11011 Cask Data Application Platform -- an heterogenous data integration/exploration canvas.
Presto presto http://<dockerhost>:8080 Presto is a distributed high-speed in-memory SQL analytical engine that can also act as a data virtualization layer. Supports JDBC, SqlAlchemy, and ODBC (requires license) admin/admin
Superset superset http://<dockerhost>:8088 Superset is a cloud-based self-service BI, exploration, and visualization tool that natively integrates with big data and Presto admin/admin
Airflow airflow http://<dockerhost>:10080 As deltalakes manage numerous ETLR queries, it is imperative to visually schedule, manage, monitor concurrent schedules. Airflow is a workflow management/job scheduling solution for big data. admin/admin
Kafka kafka <dockerhost>:9092 Batch data arrives in files. Realtime streaming data arrives on a high-speed event message bus. Kafka is a high-performance, fault-tolerant, low-latency, distributed event streaming framework for building loosely coupled HA big data services. SASL:PLAIN/PLAINTEXT:9092
Zookeeper zookeeper <dockerhost>:2181 HA services like Kafka rely on distributed discovery, monitoring, and coordination mechanism provided by Apache Zookeeper.

Docker

It is intended that the majority of the developer interaction is in the spark container. To see all docker services and their container IDs, use --

docker ps # check docker containers

Only one instance/service is created by default: if you wish to scale a service with replicas, you can spin more with the docker-compose scale command. Ensure enough port-ranges are published in the compose yml file to allow replicas to service application requests.

 docker-compose up -d --scale minio=2 --scale presto=3

Jupyter

Jupyter is running on port 8888. The password is empty. If you want to reset the password, you can choose to set the password as

# On the host prompt
docker exec -it <container_id_of_spark> bash

# On the container prompt
jupyter notebook --generate-config
echo "c.NotebookApp.token = u'<PASSWORD>'" >> ~/.jupyter/jupyter_notebook_config.py
exit

 # On the host prompt again
docker restart <container_id_of_spark>

Both new Jupyter Lab and classic Jupyter Notebook interfaces are implemented at http://<dockerhost>:8888/lab and http://<dockerhost>:8888/tree respectively.

The composition -- on first bootstrap -- comes empty. There is a pre-supplied SparkStart.ipynb notebook in the /notebooks folder that will fetch airports/IATA data from the web and create a default.airports table in the warehouse. Services such as Superset contain a prebuilt Airports Dashboard that relies on an airports dataset in the default schema of Hive.

Jupyter Tree/Lab Interface

You can open a development terminal (to run Java, Maven, Git, Spark, Hive, Hadoop, Flume etc) using one of the two options below --

Hive

Hive is available in the spark container. In reality, hive is a pre-defined alias for spark-sql. To issue hive command, you may either use CLI terminal in the notebook or use --

# On the host terminal
docker exec -it <spark_container_id> hive

The thriftserver is running on port 10000. I used binary thrift interface to service client requests. If you want to emulate HTTP interface (because you wish to implement knox gateway), you may also use HiveServer2 via HTTP transport mode. Current thriftserver is defined as binary as following in the sparkbox/entrypoint.sh --

Binary Interface

${SPARK_HOME}/sbin/start-thriftserver.sh --master=local[1] --driver-memory=1g \
    --hiveconf hive.server2.thrift.bind.host=0.0.0.0 \
    --hiveconf hive.server2.thrift.port=10000 \
    --hiveconf hive.server2.authentication=NOSASL \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

or you can use the following for the HTTP Interface

${SPARK_HOME}/sbin/start-thriftserver.sh --master=local[1] --driver-memory=1g \
    --hiveconf hive.server2.thrift.bind.host=0.0.0.0 \
    --hiveconf hive.server2.transport.mode=http \
    --hiveconf hive.server2.thrift.http.port=10000 \
    --hiveconf hive.server2.http.endpoint=cliservice \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

You can use standard tools like DBeaver to explore Hive catalog. The standard Hive JDBC driver works well with Spark.

Binary JDBC Protocol HTTP JDBC Protocol

Notice that the spark thriftserver was bootstrapped with delta extensions so mutable delta queries will work out-of-the-box.

Updates allowed on Spark Deltalake

You can also use ODBC drivers to connect Microsoft PowerBI/Excel to Hive. Cloudera (or Simba, CDATA) ask for a freemium license to enable ODBC functionality.

Spark PowerBI Connection ODBC Access to Spark

You can use DirectQuery option to pushdown queries to Hive/Spark.

Sample PowerBI Dashboard

Additionally, you can use sqlalchemy (via pyhive module) to query Hive tables directly as well. Spark, of course, can also read hive tables using standard spark.table('default.airports') syntax.

Query Hive/Spark directly from notebook

Presto

Presto has two flavors -- PrestoDB and Trino -- two very similar but forked differently by original Presto contributors. I used the Starburst Presto image in the composition file.

Presto is a high performance, distributed SQL query engine for big data. Presto's architecture allows users to query a variety of data sources such as Hadoop, S3, GCS, Azure, MySQL, Cassandra, Kafka, and MongoDB. In fact, any generic JDBC will work although it is not enabled in the docker composition. Presto can also query from multiple data sources within a single query (virtualization). This data virtualization layer -- along with its high speed pipelined execution engine -- really makes Presto a pivotal element to integrate heterogenous service surface of SSAP appliance: it makes SQL Server, Vertica, Postgres, Hive, S3, Kafka catalogs seamlessly appear as a single data asset without prior physical data movement.

Presto DBeaver Configuration Speedy Cross Database Queries with Presto
Querying Hive/Spark Querying Presto

You can also of course use ODBC drivers to connect PowerBI/Excel tools to Presto. I do not have a free ODBC driver (PowerBI only works with ODBC sources) for Presto, so unless you have an enterprise license for CData, Simba, or Starburst Presto ODBC drivers, the PBI based interaction with Presto is unfortunately limited.

To query Presto from the notebook, pyhive connectors are pre-installed in the container.

Query Presto directly from notebook

Currently presto is only configured with the local Hive Catalog (running atop Minio object storage), but since Presto supports many more data sources like GCP, Kafka, MySQL, SQLServer, Postgres, Accumulo, Redshift, ElasticSearch, Log Files etc, you may add more catalogs to Presto by following Connector Guides documentation. The catalog folder in the root namespace of the docker host can be modified to accommodate new connector catalogs.

Superset

Superset is an Apache licensed open-source cloud BI solution that supports -

Of particular note are three sections --

Presto Connection in Superset Spark Connection in Superset
Query Databases from Superset
Build and socialize dashboards in Superset

CDAP

Cask Data Application Platform (CDAP) is an open source application development platform for Hadoop. It addresses a broad range of real-time and batch data integration needs with a visual canvas. It provides seamless assemblage and devops of complex pipelines into production. Cask, owned by Google, offers managed code-free data integration services (branded as Google Cloud Fusion on the GCP platform). We integrate the open-source version into our SSAP appliance.

Wrangler

Wrangler is a visual canvas to explore, cleanse, shape, profile, and enrich a variety of data sources (like databases, Kafka streams). You should be able to import (add connection) from data sources and quickly shape, describe, cleanse, and infer data quality.

Add data connection Explore Data

Replication

CDAP's Replication module makes it easy to replicate live data from operational databases like MySQL, SQL Server, Oracle, SAP HANA etc into big data environment like Hadoop. For anyone that has attempted to do this manually, the chore of maintaining waterline markers, ensuring idempotent changes, designing monotonic index columns, maintaining change logs, managing parallel data replication threads etc can be daunting. The replication module makes table level "follow-changes" semantics easy to implement on Hadoop.

Enabling realtime replication from Operational Stores to Deltalake

Hub

CDAP's Hub offers a single managed interface to import, integrate, and leverage myriad of cloud services and databases. If a pre-provisioned visual icon to integrate datasource does not exist (say Marketo or Anaplan), there are CDAP connectors that can be easily downloaded and dynamically configured into CDAP.

Add a myriad of plugins, sources, and databases on the fly

Studio

CDAP's Studio -- the core competency of CDAP -- is the visual ETL canvas for quickly assembling complex extract-transform-load (source-wrangle-sink) routines graphically; optionally you can also deploy these pipelines for either real-time or batch integration. Data engineers can schedule pipelines and attach alerts/triggers from the same canvas. Draft pipelines can also be replicated, saved, shared, undeployed from the List view.

Develop complex data pipelines visually in CDAP Studio

Airflow

Apache Airflow is an open-source workflow management platform written completely in Python by AirBnB. Airflow allows data engineers to programmatically author, schedule, and monitor their workflows via an user interface. Additionally, Airflow offers a multitude of data integration/connectors (ala CDAP) to cloud like SFDC, GCP, AWS, and Azure. While there is some redundancy in process-flow and data-flow authoring/monitoring between CDAP Studio and Airflow, I combine both these services in the composition to allow flexibility/choice for the users. Google offers a managed airflow service, much as Amazon, under Google Cloud Composer branding.

Airflow schedules and executes jobs in "DAGs" (directed acyclic graphs) -- a composition of tasks. A sample DAG -- below -- looks as following when airflow automatically (watches and) picks up the Example_DAG.py from the ${AIRFLOW_HOME}/dags (mounted from the host's airflow/dags) folder.

Directed Acyclic Jobs (composition of tasks) scheduled by Airflow

Flume

Apache Flume is a reliable, durable, and highly available watcher/transport bus for implementing complex enterprise integration patterns. Flume organizes large continuous and batch data integrations into simple "pipelines" managed by simpleton "agents". It is possible to cascade multiple pipelines into complex, distributed orchestrations with flexible fault-tolerant policies. FlumeNG is installed as a simple binary in the composition. Later, we show how ESC uses Flafka and Flume to integrate realtime Kafka and File sources respectively. CDAP already provides ingressing these beam (batch + streaming) sources visually, albeit some integrations are best handled via command line services like Flume.

Minio

Minio is an open-source high-performance S3-compatible object-storage implementation that provides inbuilt scaling, replication, versioning, locking & concurrency controls, and erasure coding.

HDFS offers scaling and erasure coding already, but prior assignment of the storage to HDFS preempts other uses/services. Multiple enterprises/clouds now offer storageless solutions: preempting a dedicated quota apriori for HDFS is not optimal considering majority of the workloads (including location aware MPP primitives) are now moving to serverless (ephemeral/short-lived container) solutions like Amazon's EMR and Google's DataProc. Since Minio can seamlessly gateway to existing HDFS, NFS, and cloud buckets just as easily as striping object models across physical disks, I recommend switching to the object storage bandwagon now: you can transition existing HDFS implementations via the gateway pattern if needed.

Despite my first resistance, Vamsi Paladugu is a visionary who convinced me -- in 2019 -- that these serverless/storageless k8s/object-storage models are indeed the future-state deltalakes.

I enable simple object storage in the composition via managed docker volumes. Two buckets are created apriori upon boot -- hdfs & spark in the entrypoint.sh. Minio buckets can be acessed/managed visually via minio UI. From CLI --

# minio is the host alias defined as following in the composition
# MINIO_HOST=minio:9000
# ACCESS_KEY=accesskey
# SECRET_KEY=secretkey

mc alias set minio ${MINIO_HOST} ${ACCESS_KEY} ${SECRET_KEY} --api S3v4

# Manipulate objects and buckets on minio
mc ls minio/            # List buckets
mc mb -p minio/spark    # Make a bucket
mc tree minio/spark     # List tree hierarchically

The composition defines both spark.sql.warehouse.dir and hive.metastore.warehouse.dir as s3a://spark/warehouse/ in the core-site.xml -- so both delta and (un)managed tables land on the spark bucket. The fs.s3a.endpoint is defined as http://minio:9000/. Where a sample delta table is defined as following, the objects on s3a are shown below --

-- Spark DDL
CREATE TABLE airports_delta LIKE airports 
USING delta 
LOCATION 's3a://spark/warehouse/airports_delta';
Table data stored as objects in the spark bucket on Minio

Furthermore, the property fs.s3a.impl is defined as org.apache.hadoop.fs.s3a.S3AFileSystem in the core-site.xml. This enables you to manage (copy, delete, move, create) s3a namespace via HDFS commands --

hadoop fs -ls s3a://spark/*     # spark is the bucket name

# To copy files between file systems (including distcp)
hadoop fs -cp images/minio_objects.png s3a://hdfs/

hadoop fs -ls -R s3a://spark/    # To list files using HDFS tools

Copying-Exact vs Refactor Choice

Existing codebase in ESC's Neoview EDW -- built over last 11 years -- must be converted to the new deltalake on Spark 3. ESC chose to first refactor the entire business logic (BLOC) again -- but was daunted by the size of the effort. The manual analysis, reauthorship, buyin from the business domains are overwhelming considering Dexcan Consultants (ESC's IT partner) provided a very short lease of support (under 6 months) to cutover the Neoview servers to Spark.

I recommended using copy-exact method instead -- while this can inherit the bloc/architectural debt, the historic archetype provides easier guideline to replicate-first-fix-later guarantees; I built tools to automate the translation of existing BLOC into Spark ANSI SQL while also ensuring the said requirements are fulfilled.

To illustrate the automation, and to protect ESC's IP here, I use sample queries to demonstrate DDL, DML, reporting, and airflow reconstruction on the deltalake. The queries are slightly adapted (augmented) for better coverage. The modified files are available in the sql_queries folder.

Spark Translator Preamble

Spark context is created. The UI should be visible at port http://<dockerhost>[4040-4043]

Load SQL Queries

Read the as-is codebase

Metadata in Queries

Extract headers, collate lines. Headers contain some importanr info like developers, dates, RFC etc.

Analyze SQL Script

A script can contain multiple statements and comments. Rip and analyze these statements sequentially.

View Query Statement Spread

For all the given queries, visualize the DDL (Create Statements), DML (Insert/Update/Delete Statements), and Reporting (Select statements) distribution.

DDL Definitions

Before copying-exact from Neoview tables and SQLs, I will need to create target table structures in Spark to map 1:1. I must capture all data definition templates (aka create table statements). A proper destinant receptacle of original structures is needed first.

Scan DDL Template Signatures

DDLs always follow the CREATE TABLE|DATABASE [IF NOT EXISTS] TABLENAME|DBNAME [(COL COLTYPE [CONSTRAINT], ...)] MORE TEXT pattern.

Convert to Spark DDL

We have the structural intel of the tables. Spark (and Hive) have good coverage of data integrity semantics. I provide just-enough-translation (of data types) and omit constraint definitions in Spark to receive original Neoview data.

Project CTAS Statements

In order to create hydration structures for copying existing data from Neoview to Spark, all the necessary DDLs must be precreated. I propose using a simple external table design first (to receive untyped string data) so all files (irrespective of the semantics & constraints) are able to reach the aluminium* layer -- the aluminium layer allows for true schema-on-read accept-first-reject-later bloc. Furthermore, I also propose creating a true bronze layer in delta format (with stronger typing and semantic rules) so bloc integrity (and schema-on-write rules) can be enforced.

L0 Aluminium Layer to Allow Beam Data Quarantine

Spark Catalyst Parser Validation

Translation of DDLs, DMLs, and Reporting queries into ANSI SQL is fraught with challenges. I cannot be assured all translated queries are indeed executable in Spark. To validate -- logically -- not physically (because the destinant warehouse tables do not yet exist) -- I can use the logical plan of Spark's catalyst parser. The JSON output from the catalyst parser provides an excellent carte of query intelligence (UDFs, tables, fields, SQL hints etc) that can in itself be used to scan/inventory the EDW codebase.

The catalyst's logical parser is strict -- so the code must actually be proper syntactic SQL -- even if not valid SQL -- before its capabilities may be used. Our effort nevertheless is aiming to convert to proper SQL to leverage catalyst fully.

Hive/Spark CTE

Creating aluminium and bronze raw zones for data ingress. Only create tables; databases are pre-created manually.

Fire Up Table Creations

Tables Created in Deltalake Storage Locations Primed in Minio

Ingestion to Raw Layers

Neoview receives file sources every 15 minutes. The design above mandates ingress into "external" aluminium zone first on Spark -- imagine this zone as a quarantine layer for qualifying data. From there, the data is bloc validated periodically and naturalized into the bronze layer. Assuming streaming data flows through Kafka and the batch data is staged to pre-destined folders, the following Flafka configuration allows simple beam (batch + streaming) ingestion. Of course, the CDAP also allows visual integration from heterogenous sources.

Continuous Data Pipelines into Deltalake

Below, I emulate sample realtime data -- a ticker event stream published to a test topic.

External Table Mapped to Receive Streaming Data from Flafka. Empty at first.

Managed Flafka Ingestion

Author a simple pipeline to collect, aggregate, and flush records to Minio/HDFS.

Run Mock Events

Write some mock data to demonstrate how aluminium tables (schemaless tables) can automatically collect new streaming data as it happens.

As data is either continually streamed or incrementally accumulated in batches, the data readily reflects in the aluminium zone.

Storage Locations Primed in Minio Raw Aluminium Tables Automatically Refresh New Data

Incremental Idempotent Ingress

The data has thus far only entered the aluminium zone -- a presumed quarantine zone. The deltalake logic begins only at the bronze layer. In order for the data to graduate from aluminium to the bronze layer, it is imperative to put together scheduled progression of jobs that insert-into-bronze(B)-select-new-data-from-aluminium(A) regularly. While this is easier to comprehend, it is also imperative that when these graduation jobs are executed, they do not redundantly reprocess (or slip unprocessed) data. Especially when these jobs run frequently per day with speculative execution (and concurrency), it is critical that the idempotency of the A===B aka ((A1-B1) = 0), ((B1-A1) = 0), ($A1 \cap A2$ == 0), and ($B1 \cap B2$ == 0) where A1 and A2 are records selected in two separate runs of the job; similarly B1 and B2 are records inserted in two separate runs of the job. The jobs must be smart to skip over previously processed files/data (B) and only process newly sourced information (A).

Where the DataBricks implementation supports copy into syntax, the open source implementation running atop minio or ESC's on-prem hardware lacks this convenience. To overcome, waterlining ingress through a control table is recommended.

Author and Schedule Loaders in Airflow

Airflow accepts a cumulation of DAGs in the form of a zip file. All loader airflows are combined into a singular package and published to the ${AIRFLOW_HOME}/dags folder.

Automatic Watches & Schedules

With the persistence of the zip package containing the loader jobs in the airflow DAGs folder, the jobs appear automatically on the airflow canvas.

Airflows DAGs are automatically scheduled Delta Idemptotent Load Jobs Run on Spark Deltalake

ETL Lineage Analysis

Like most data warehouses, ESC's implementation turned into a spaghetti mess of many tables and scripts. The exact lineage and pedigree of bronze, silver, gold layers is now extremely convoluted and non-linear. For two reasons -- first, weeding out unnecessary trace,s and second, to improve performance of the linear ETL, it was imperative to untangle the big knotted DML codebase for reengineering. Below, a snapshot of one of the 50 reporting table lineages that are currently served by ESC EDW.

The complexity of the existing ESC's Neoview DML
You expect this... You find this...

Parsing the lineage will help determine the central hub-spoke models in the warehouse: it will also establish prioritization of the sources, consumers, engineering for transformation.

Rip the DML Codebase

The code is organized into scripts/files. Each scripts contains multiple statements. Each statement -- like Phrase Structure Rules -- can itself be a composition of multiple sub-statements. Each sub-statement (or statement) can have descendancy-lineage which transforms base-tables (selecting data) into child-tables (create-insert-delete-update) with SQL.

Untangling will mean that every sub-statement's parent-child lineage terminals are captured and unwound to trace ordering and continuity of data.

ANSI SQL

Not all scripts and statements are directly translatable to Spark: nevertheless when ESC has ~15K scripts and nearly ~100K statements, it is extremely daunting to manually curate the queries and analyze their terminals: this is why ESC struggled with EDW retirement. Below, we apply a series of automatic pipelined transformations to change the Neoview statements.

Pipeline of transformations

  1. Find parameters in the statement. ESC used %(variable_parameter)s pre-processing macro to parameterize the jobs.
  2. Capture insert into-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1008135") order of columns so non-nullable/nullable imputations can be applied later
  3. Impute placeholder values for parameters (for completion semantics). This helps check SQL validity: if valid SQL is found, we can use catalyst parser to extract query intel too.
  4. Make lexical (using regular expression) replacements (like escaping quoted strings, converting currentcurrent_timestamp, create volatile tablecreate temporary view etc)
  5. Scan user-defined functions in the code to build a UDF library later.
  6. Make semantic replacements like transposeflatten-explode etc.

Parameters

Scan for pre-processing parameters in the query.

Escapes and Quotations

Any syntactic deviations (and the code integrity) like non-escaped quotations, decoratory ascii spacing must be kept. Before any pipeline transformations violate the integrity of code, ensure the text is correctly escaped so it can be unescaped later.

Simple Syntactic Intelligence

Thankfully the ESC's Neoview code did not include any stored procedures or triggers. In fact, the coding standards were strict and did not have any lax syntactic rules. The strict enforcement helps us scan (and translate) simple code patterns (like temporary tables, insert table) with simple regular expressions.

Regex Replacements

Majority of the ESC's code is expressed in a very finite language variant. Below, simple regex replacements that are prevalent in the codebase.

Parameter Replacements

Hardcoded parametric macros do throw off the logical plan of Spark's catalyst parser. In order to satisfy -- only for placeholders even if "in-name-only" -- impute some meaningful values.

Semantic Replacements

Occassionally there are statements like transpose-c01932297.pdf?id=c01932297&section=&prelaunchSection=&softrollSection=&deepLink=&isFutureVersion=false&isQS=false&getHtml=#i1004421) that are not implemented in Spark: they are instead implemented as explode-flatten syntax. These are a little more complicated to address: nevertheless, with some pyparsing dialect authoring, it should be easy to write complex semantic transforms too.

Conversion

Assemble all the code translator routines and convert the input codebase.

Lineage Build

Now that we possess a good ANSI compliant SQL, it is important to identify the parent-child lineage information for each statement/transform. The select parser example offers great builder material to build a parser to scan for columns, tables, UDFs, projections etc.

We are only interested in the names of the tables that are in the parent and child chains of the statement.

Compose simple patterns for lineage capture

Capture the parent/child tables for every statement using pyparsing rules defined above.

Substatements

If a statement like select from (select A) union (select B) contains sub-select expressions, we can extract that intelligence too -- all the way to the token level.

Capture UDFs and Parentage/Childage

Function Library

Not all functions from Neoview are available in Spark (or vice versa). I scanned across all the functions in Neoview use and packaged a few simple functions for Spark.

Sequence into Airflow

Now that we have the terminals for all sub-statements/transforms inventoried (as well as the function gaps etc filled), the sequence of operations to execute ETL curation needs to be assembled. Break out every parent and child (if they are applicable to the query) and build dependency traces.

Circuitry

Find the full trace of the circuitry/lineage by using Networkx.

NetworkX

The hub-spoke, centrality, dependency, motifinding, transitive-closure analysis in NetworkX is awesome. We will use NetworkX for full transitive analysis of the codebase.

Visualize the Lineage

The above chart is of course very confusing. Outside of the fact that there are two islands -- one presumably the ticker table -- second presumably the music schema, it is is hard to tell if there is any value to this visualization.

Immediately, the categorization of bronze, silver, gold tables is clearly evident. Furthermore, the implied parallel & sequential paths of materialization is evident.

Subgraphs (DAG Islands)

If the entire codebase in ESC EDW is a big goo of spaghetti, it is imperative that all the statements are executed in that same order (and in unison) to manage the durability and integrity of the data. Nevertheless as ESC's EDW serves multiple domains, it is reasonable to assume the dependency-complexity is limited/isolated by domains. To separate the jobs, we must identify the subgraphs (disconnected islands) in the circuitry and author them as separate airflow jobs.

2 Subgraphs

It is clear that the entire codebase spaghetti (of this toy queryset) has two distinct ETL islands. Now, I reauthor the 2 islands as 2 distinct airflow jobs which have independent cadence from each other. Please note that 2 is very small -- compared to ESC's ~350 islands.

Inversion

Instead of running the bronze, silver, and gold lineage from left to right (thereby executing unnecessary traces), we will run the lineage from right to left. This empowers us to preemptively prune unnecessary transformation lines in the EDW.

Code Flow

We have to rebuild the flow paths; so it is imperative that we bring back the SQL code into the graph for reassembling flows in Airflow.

ETL SQL in Airflow

Now that we know there are two independent islands (one ticker and another music domains) in the codebase, we automatically translate/author airflow's two DAGs.

Voila

The ETL DAGs -- scheduled to run daily -- appear in airflow and run refinery operations against the deltalake. Furthermore, enterprise features like logging, alerting, monitoring, security, scaling, credential protection etc are inbuilt into the Apache Airflow.

Airflow DAGs scheduled Airflows Executing

Rationing Compute

In a large EDW implementation, it is plausible that a shared foundation of the deltalake runs multiple business domains. If each domain has different SLA, OLA needs, riding all the workloads on the same resource negotiator can be a challenge.

Spark manages resource allocation via (yarn) queue. While different queues can be pre-assigned to business domains for managing OLAs differently, the underlying yarn framework itself throws an inherent challenge with its sluggish bootstrap negotiation. Airflow's SparkSQL operator currently submits each task as a yarn job unless the job is authored as a lumpsum HQL file. Given there are substantively large number of tasks per DAG, introducing yarn negotiation latencies into every task can lead to unnecessary sluggish execution speeds of the job. So how do we manage OLA, capacity allocations differently without resource contentions?

Airflow also offers JdbcOperator. As you may have noticed, I used airflow_conn_id in the JdbcOperator for both the loader and etl DAGs. These JDBC connections are serviced by Spark via HiveServer2 (Thrift or HTTP) interface. Those services were bootstrapped apriori with an assigned capacity of master, executor memory, executor instances, impersonation, transport security details etc. The thriftserver is "always-on": it offers a dedicated capacity, transactional/interactive experience to the clients. In ESC's deltalake implementation, I shunned the yarn queues altogether and instead spun multiple instances of thrift services across different ports (and clusters).

Each of that dedicated thrift service (assigned to business domains like finance, sales etc) then received a "Connection ID" in the Airflow console. With pluggable conn_id parameter in the JdbcOperator, I could federate complex (and elastic) OLAs across the deltalake.

Airflow Connection ID Setting Airflow Connection Console

In the docker composition, the connection id is not defined in the UI; nevertheless, the docker-compose defines the ENV setting (look in the env section of the compose file). You do not have to configure it in the GUI, but if you wish to, use the following configuration.

Airflow Connection ID Setting

Reporting

There are three paths of data egress from the ESC's EDW platform.

  1. File extracts for downstream datamart integration. These files must dropped in csv/xlsx form to a shared folder.
  2. Deltalake must interplay with the business objects universe for ad hoc query designs.
  3. Interactive business intelligence (Qlik & PowerBI) integration via JDBC/ODBC paths.

While the full scope of downstream integration and egress patterns are not covered here, know that --

  1. There is dedicated Presto capacity for serving high-speed data from deltalake to BI and semantic layers. Understandably, Presto does not directly consume delta tables. But with the manifest integration, direct querying fast analytics (using JDBC and ODBC) are possible from Presto.
  2. The delta tables are parquet underneath; and registered in the Hive metastore. So both Spark and Pandas (via JDBC/SQLAlchemy) can egress data quickly from the deltalake. A sampler notebook is provided in the composition that reads delta tables and stages downstream tables.
  3. Integrations with PowerBI, Excel, Qlik, Jupyter, Pandas, Spark, ADO .NET have been validated; and playbooks are published.